Skip to main content

Databricks

Supported Databricks Runtime versions: 12.2 - 16.4 (scala 2.12)

Note: Databricks Serverless is not supported for this instrumentation. You may optionally use the DBT agent instead.

To enable integration with definity on Databricks, follow these steps:

  1. Attach the Spark Agent JAR to your compute cluster.
  2. Configure jobs or tasks with definity parameters.

Cluster Configuration

1. Create an Init Script

Create a script to download and add the definity Spark agent to the cluster’s CLASSPATH and set the default definity parameters. Save this script in cloud storage (e.g., S3).

definity_init.sh
#!/bin/bash
JAR_DIR="/databricks/jars"
mkdir -p $JAR_DIR
DEFINITY_JAR_URL="https://user:[email protected]/java/definity-spark-agent-[spark.version]-[agent.version].jar"
curl -o $JAR_DIR/definity-spark-agent.jar $DEFINITY_JAR_URL
export CLASSPATH=$CLASSPATH:$JAR_DIR/definity-spark-agent.jar

cat > /databricks/driver/conf/00-definity.conf << EOF
spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin
spark.definity.server="https://app.definity.run"
spark.definity.api.token=YOUR_TOKEN
#spark.definity.env.name=YOUR_DEFAULT_ENV
EOF

2. Attach the Init Script to Your Compute Cluster

In the Databricks UI:

  1. Go to Cluster configurationAdvanced optionsInit Scripts.
  2. Add your script with:
    • Source: s3
    • File path: s3://your-s3-bucket/init-scripts-dir/definity_init.sh

3. Configure Spark Cluster Name (Optional)

Default cluster name is taken from databricks cluster name.

Navigate to Cluster configurationAdvanced optionsSpark and add:

spark.definity.compute.name      my_cluster_name

Note: These settings affect the default Spark session created by the cluster. Definity will monitor this session automatically.


Job Configuration

By default, the databricks job name is used as the pipeline name, and the task key as the task name. If needed, you can override these settings in the job configuration:

Example: Airflow Notebook Job

run_notebook = DatabricksSubmitRunOperator(
task_id="run_notebook",
json={
"notebook_task": {
"notebook_path": "/Users/[email protected]/my_notebook",
"base_parameters": {
"spark.definity.pipeline.name": "{{ dag_run.dag_id }}",
"spark.definity.pipeline.pit": "{{ ts }}",
"spark.definity.task.name": "{{ ti.task_id }}"
},
},
"name": "notebook-job",
}
)

Example: Airflow Python Job

run_python = DatabricksSubmitRunOperator(
task_id="run_python_script",
json={
"spark_python_task": {
"python_file": "dbfs:/path/to/job.py",
"parameters": [
"spark.definity.pipeline.name={{ dag_run.dag_id }}",
"spark.definity.pipeline.pit={{ ts }}",
"spark.definity.task.name={{ ti.task_id }}"
]
},
"name": "python-job",
}
)

Example: Manual Task Scope Configuration

You can manually set task scope in your code.

When doing so, set the following Spark config at the cluster level to disable automatic session detection:

spark.definity.databricks.automaticSessions.enabled=false

Basic Example

# Set this property to define a new task scope
spark.conf.set("spark.definity.session", f"pipeline.name={my_pipeline},pipeline.pit={pit_date},task.name={my_task}")

Advanced Example

For multiple logical tasks in a single session, unset the property when the task ends:

try {
// your job logic here
...
} finally {
// Unset the session to signal task completion (recommended in a `finally` block to catch failures)
spark.conf.unset("spark.definity.session")
}

Note: This is not required for Python script jobs and notebook jobs.


Example: Jobs API

definity parameters can be passed via the base_parameters or parameters fields depending on the task type.

{
"tasks": [
{
"task_key": "task1",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_1",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task1"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "task2",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_2",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task2"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "python_task1",
"spark_python_task": {
"python_file": "s3://my-bucket/python_task.py",
"parameters": [
"yourArg1",
"yourArg2",
"spark.definity.task.name=python_task_1",
"spark.definity.pipeline.name=my_pipeline",
"spark.definity.pipeline.pit=2025-01-01 01:00:00"
]
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
}
],
"format": "MULTI_TASK",
"queue": {
"enabled": true
}
}